package com.zj.es.test.util;

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@Slf4j
public class Mysql2ESUtil {
    public static final String url = "jdbc:mysql://localhost:3306/lagou_position?charset=utf8mb4&useSSL=false";
    public static final String name = "com.mysql.cj.jdbc.Driver";
    public static final String user = "root";
    public static final String password = "root";
    private volatile static Connection connection = null;

    private static Connection getConn() {

        if (connection == null) {
            synchronized (Mysql2ESUtil.class) {
                if (connection == null) {
                    try {
                        Class.forName(name);
                        connection = DriverManager.getConnection(url, user, password);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }

        }
        return connection;
    }

    public  static void  writeMySQLDataToES(String tableName,String indexName,RestHighLevelClient client){
        BulkProcessor bulkProcessor  = getBulkProcessor(client);
        Connection  connection = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            connection =getConn();
            String  sql = "select * from " + tableName;
            ps = connection.prepareStatement(sql,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);

            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String,String>> dataList = new ArrayList<>();
            HashMap<String,String>  map = null;
            int  count = 0;

            String  field = null;
            String  value = null;
            while(rs.next()){
                count ++;
                map = new HashMap<String,String>(128);
                for (int i=1;i< colData.getColumnCount();i++){
                    field = colData.getColumnName(i);
                    value = rs.getString(field);
                    map.put(field,value);
                }
                dataList.add(map);

                if (count % 10000 == 0){
                    // 将数据添加到 bulkProcessor
                    for (HashMap<String,String> hashMap2 : dataList){
                        bulkProcessor.add(new IndexRequest(indexName).source(hashMap2));
                    }
                    // 每提交一次 清空 map 和  dataList
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理 未提交的数据
            for (HashMap<String,String> hashMap2 : dataList){
                bulkProcessor.add(new IndexRequest(indexName).source(hashMap2));
            }
            bulkProcessor.flush();

        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                rs.close();
                ps.close();
                connection.close();
                boolean  terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }


    private static BulkProcessor getBulkProcessor(RestHighLevelClient client) {

        BulkProcessor bulkProcessor = null;
        try {

            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    log.info("Try to insert data number : "
                            + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    log.info("************** Success insert data number : "
                            + request.numberOfActions() + " , id: " + executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    log.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
                }
            };

            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                    .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            // 注意点：让参数设置生效
            bulkProcessor = builder.build();

        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                log.error(e1.getMessage());
            }
        }
        return bulkProcessor;
    }

}
